package com.xnx3.obs.sources;

import com.huaweicloud.dis.util.PartitionCursorTypeEnum;
import com.xnx3.obs.config.DISReaderConfig;
import com.xnx3.obs.config.HoodieWriterConfig;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.spark.sql.SaveMode;

/**
 * Fetch DIS notification and write to Hudi demo
 * @author aly
 */
public class TestDISEventSource {

    public static void main(String[] args) {
        // Configure Hudi parameters
        HoodieWriterConfig config = new HoodieWriterConfig()
                .basePath("obs://hudi-test-target/hudi_dis_cow")
                .tableName("hudi_dis_cow")
                .saveMode(SaveMode.Append)
                .keyGenerator(NonpartitionedKeyGenerator.class.getName())
                .recordkeyFieldOptKey("partitionKey")
                .precombineFieldOptKey("timestamp");

        // Configure DIS parameters
        DISReaderConfig dicConfig = new DISReaderConfig()
                .streamName("hudi-dis-test")
                .partitionId("0")
                .startingSequenceNumber("0")
                // Configure how to download data
                // AT_SEQUENCE_NUMBER: Obtaining from the specified sequenceNumber requires setting GetPartitionCursorRequest.setStartingSequenceNumber
                // AFTER_SEQUENCE_NUMBER: Starting from the specified sequenceNumber, you need to set GetPartitionCursorRequest.setStartingSequenceNumber
                // TRIM_HORIZON: Get from the oldest record
                // LATEST: Get from the latest record
                // AT_TIMESTAMP: Obtaining from the specified timestamp (13 bits) requires setting GetPartitionCursorRequest.setTimestamp
                .cursorType(PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name());

        // Get data and write
        new DISEventSource().fetchEvents(config, dicConfig);
    }

}
